import pandas as pd
import xlwings as xw
import numpy as np
import os
from typing import Dict, List, Tuple, Any, Optional
class EnhancedCostAnalysisTemplate:
"""
增强版成本分析模板处理器
解决表间关联和数据完整性问题
"""
def __init__(self):
# 主键映射关系
self.key_mappings = {}
def process_single_file(self, file_path: str, sheet_name: str = None) -> Dict[str, Any]:
"""
处理单个成本表文件,建立完整的关联关系
"""
app = xw.App(visible=False)
wb = app.books.open(file_path)
if sheet_name is None:
sheet_name = wb.sheets[0].name
ws = wb.sheets[sheet_name]
result = {
'file_name': os.path.basename(file_path),
'sheet_name': sheet_name,
'product_info': {},
'cost_details': None,
'summary': {},
'comments': [],
'process_details': None, # 改为复数,包含多个工艺信息
'material_parameters': None, # 材料参数表
'key_relationships': {} # 存储主键关联关系
}
try:
# 1. 提取产品基本信息,并生成主键
result['product_info'] = self.extract_product_info_with_key(ws)
product_key = result['product_info'].get('product_key')
# 2. 提取成本明细,关联产品主键
cost_details = self.extract_cost_details_with_relationships(ws, product_key)
result['cost_details'] = cost_details
# 3. 提取汇总信息,关联产品主键
result['summary'] = self.extract_summary_with_key(ws, product_key)
# 4. 提取批注信息
result['comments'] = self.extract_comments(ws)
# 5. 提取完整的工艺信息表(多个半成品)
process_details = self.extract_complete_process_info(ws, product_key)
result['process_details'] = process_details
# 6. 提取材料参数信息
result['material_parameters'] = self.extract_material_parameters(ws, product_key)
# 7. 建立完整的主键关联映射
result['key_relationships'] = self.build_key_relationships(
product_key, cost_details, process_details
)
except Exception as e:
print(f"处理文件时出错: {e}")
raise
finally:
wb.close()
app.quit()
return result
def extract_product_info_with_key(self, ws) -> Dict[str, Any]:
"""提取产品基本信息并生成主键"""
product_info = {}
# 查找产品名称
product_name_row = self.find_row_by_text(ws, "产品名称")
if product_name_row:
product_name = ws.range(f"C{product_name_row}").value
product_info['产品名称'] = product_name
# 查找产品编码
product_code_row = self.find_row_by_text(ws, "产品编码")
if product_code_row:
product_code = ws.range(f"L{product_code_row}").value
product_info['产品编码'] = product_code
# 生成产品主键
if product_info.get('产品编码'):
product_info['product_key'] = f"PROD_{product_info['产品编码']}"
else:
product_info['product_key'] = f"PROD_{hash(str(product_info))}"
# 提取其他基本信息
date_row = self.find_row_by_text(ws, "日期")
if date_row:
product_info['日期'] = ws.range(f"P{date_row}").value
# 提取制单、审核等信息
maker_row = self.find_row_by_text(ws, "制单")
if maker_row:
product_info['制单'] = ws.range(f"P{maker_row}").value
product_info['审核'] = ws.range(f"Q{maker_row}").value
product_info['批准'] = ws.range(f"R{maker_row}").value
product_info['流程号'] = ws.range(f"S{maker_row}").value
return product_info
def extract_cost_details_with_relationships(self, ws, product_key: str) -> pd.DataFrame:
"""提取成本明细并建立关联关系"""
header_row = self.find_header_row(ws, "序号")
if header_row is None:
raise ValueError("未找到成本明细表头")
headers = self.get_headers(ws, header_row)
end_row = self.find_data_end_row(ws, header_row + 1)
if end_row <= header_row:
raise ValueError("未找到成本明细数据")
# 读取数据区域
data_range = ws.range(f"A{header_row + 1}:T{end_row}")
data_values = data_range.value
# 创建DataFrame
df = pd.DataFrame(data_values, columns=headers)
df = self.clean_cost_data(df)
# 添加主键关联
df['product_key'] = product_key
df['cost_item_id'] = [f"COST_{product_key}_{i + 1}" for i in range(len(df))]
# 提取类型和维度值信息(用于关联工艺信息)
df = self.extract_type_and_dimension(ws, header_row + 1, end_row, df)
return df
def extract_type_and_dimension(self, ws, start_row: int, end_row: int, df: pd.DataFrame) -> pd.DataFrame:
"""提取类型和维度值信息"""
types = []
dimensions = []
for row in range(start_row, end_row + 1):
try:
# 提取类型(U列)
cell_type = ws.range(f"U{row}").value
types.append(cell_type if cell_type is not None else "")
# 提取维度值(V列)
cell_dim = ws.range(f"V{row}").value
dimensions.append(cell_dim if cell_dim is not None else "")
except:
types.append("")
dimensions.append("")
# 确保长度匹配
if len(types) == len(df):
df['类型'] = types
df['维度值'] = dimensions
return df
def extract_summary_with_key(self, ws, product_key: str) -> Dict[str, Any]:
"""提取汇总信息并关联主键"""
summary = {'product_key': product_key}
# 查找合计行
total_row = self.find_row_by_text(ws, "合计", start_row=10, end_row=30)
if total_row:
# 提取主要成本数据
summary['材料总金额'] = ws.range(f"M{total_row}").value
summary['直接人工合计'] = ws.range(f"O{total_row}").value
summary['制造费用合计'] = ws.range(f"P{total_row}").value
summary['制造成本合计'] = ws.range(f"Q{total_row}").value
summary['GRF费用'] = ws.range(f"R{total_row}").value
summary['S费用'] = ws.range(f"S{total_row}").value
summary['总成本'] = ws.range(f"T{total_row}").value
# 查找价格信息
for row in range(total_row - 5, total_row + 5):
try:
cell_value = ws.range(f"U{row}").value
if cell_value:
if "基准价格" in str(cell_value):
summary['基准价格'] = ws.range(f"V{row}").value
elif "销售价格" in str(cell_value):
summary['销售价格'] = ws.range(f"V{row}").value
summary['毛利率'] = ws.range(f"W{row}").value
except:
continue
return summary
def extract_complete_process_info(self, ws, product_key: str) -> pd.DataFrame:
"""提取完整的工艺信息表(多个半成品)"""
# 查找工艺信息表头
process_header_row = self.find_row_by_text(ws, "产品代码", start_row=25, end_row=40)
if not process_header_row:
return None
# 获取工艺信息表头
process_headers = [
"产品代码", "模穴数", "单重(G)", "毛重(G)", "材料利用率",
"水口比重", "原料描述", "材料单价", "材料成本", "生产机台",
"成型周期/冲速", "小时产量", "小时费用(元)", "制造费用",
"制造成本", "装配产能(PCS/H)", "装配设备", "实际成本", "水口比例"
]
# 查找数据结束行
process_end_row = self.find_process_data_end_row(ws, process_header_row + 1)
if process_end_row <= process_header_row:
return None
# 读取工艺数据
data_values = []
for row in range(process_header_row + 1, process_end_row + 1):
row_data = []
for col in range(1, 20): # A到S列
try:
cell_value = ws.range(row, col).value
row_data.append(cell_value)
except:
row_data.append(None)
data_values.append(row_data)
# 创建DataFrame
df = pd.DataFrame(data_values, columns=process_headers)
df = df.dropna(how='all') # 删除空行
# 添加主键关联
df['product_key'] = product_key
df['process_item_id'] = [f"PROC_{product_key}_{i + 1}" for i in range(len(df))]
return df
def extract_material_parameters(self, ws, product_key: str) -> pd.DataFrame:
"""提取材料参数信息"""
# 查找可能包含材料参数的区域
material_data = []
# 在成本明细中提取材料相关信息
header_row = self.find_header_row(ws, "序号")
if header_row:
end_row = self.find_data_end_row(ws, header_row + 1)
for row in range(header_row + 1, end_row + 1):
try:
product_name = ws.range(f"B{row}").value
material_desc = ws.range(f"F{row}").value
usage = ws.range(f"K{row}").value
unit_price = ws.range(f"L{row}").value
if product_name and material_desc:
material_data.append({
'产品名称': product_name,
'材料描述': material_desc,
'用量': usage,
'材料单价': unit_price,
'product_key': product_key
})
except:
continue
return pd.DataFrame(material_data)
def find_process_data_end_row(self, ws, start_row: int) -> int:
"""查找工艺数据结束行"""
row = start_row
while row < start_row + 20: # 限制搜索范围
try:
# 检查产品代码列是否有值
cell_value = ws.range(f"A{row}").value
if cell_value is None or cell_value == "":
return row - 1
row += 1
except:
return row - 1
return row - 1
def build_key_relationships(self, product_key: str, cost_details: pd.DataFrame,
process_details: pd.DataFrame) -> Dict[str, Any]:
"""建立完整的主键关联关系"""
relationships = {
'product_key': product_key,
'cost_to_process': {},
'process_to_cost': {}
}
if cost_details is not None and process_details is not None:
# 通过产品代码建立成本明细与工艺信息的关联
for _, cost_row in cost_details.iterrows():
if cost_row.get('类型') == '半成品' and cost_row.get('产品编码'):
product_code = cost_row['产品编码']
# 在工艺信息中查找匹配的产品代码
matching_process = process_details[process_details['产品代码'] == product_code]
if not matching_process.empty:
cost_id = cost_row.get('cost_item_id')
process_id = matching_process.iloc[0].get('process_item_id')
relationships['cost_to_process'][cost_id] = process_id
relationships['process_to_cost'][process_id] = cost_id
return relationships
# 保留原有的辅助方法
def find_header_row(self, ws, header_text: str) -> Optional[int]:
for row in range(1, 50):
try:
cell_value = ws.range(f"A{row}").value
if cell_value and header_text in str(cell_value):
return row
except:
continue
return None
def find_row_by_text(self, ws, text: str, start_row: int = 1, end_row: int = 50) -> Optional[int]:
for row in range(start_row, end_row + 1):
try:
cell_value = ws.range(f"A{row}").value
if cell_value and text in str(cell_value):
return row
except:
continue
return None
def get_headers(self, ws, header_row: int) -> List[str]:
headers = []
for col in range(1, 21): # A到T列
try:
cell_value = ws.range(header_row, col).value
headers.append(cell_value if cell_value is not None else f"Col_{col}")
except:
headers.append(f"Col_{col}")
return headers
def find_data_end_row(self, ws, start_row: int) -> int:
row = start_row
while row < start_row + 50: # 限制搜索范围
try:
cell_value = ws.range(f"A{row}").value
if cell_value is None or (isinstance(cell_value, str) and "合计" in cell_value):
return row - 1
if isinstance(cell_value, (int, float)) and cell_value > 0:
row += 1
else:
return row - 1
except:
return row - 1
return row - 1
def clean_cost_data(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.dropna(how='all')
df = df.dropna(axis=1, how='all')
df = df.reset_index(drop=True)
numeric_columns = ['用量', '材料单价', '材料金额', '材料比重', '直接人工', '制造费用']
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
return df
def extract_comments(self, ws) -> List[Dict[str, Any]]:
comments = []
try:
all_cells = ws.used_range
for row in all_cells.rows:
for cell in row:
try:
if hasattr(cell, 'comment') and cell.comment:
comment_text = cell.comment.text if cell.comment.text else ''
comments.append({
'cell_address': cell.address,
'cell_value': cell.value,
'comment_text': comment_text.strip(),
'row': cell.row,
'column': cell.column
})
except:
continue
except Exception as e:
print(f"提取批注时出错: {e}")
return comments
def export_enhanced_to_excel(result, output_path):
"""将增强版处理结果导出到Excel文件"""
app = xw.App(visible=False)
wb = app.books.add()
try:
# 1. 产品信息表(带主键)
product_sheet = wb.sheets.add("产品信息")
product_info = result['product_info']
product_sheet.range('A1').value = "产品信息(主表)"
row = 2
for key, value in product_info.items():
product_sheet.range(f'A{row}').value = key
product_sheet.range(f'B{row}').value = value
row += 1
# 2. 成本明细表(关联产品主键)
if result['cost_details'] is not None:
cost_sheet = wb.sheets.add("成本明细")
cost_sheet.range('A1').value = "成本明细(关联表)"
cost_sheet.range('A2').value = result['cost_details']
# 3. 汇总信息表(关联产品主键)
summary_sheet = wb.sheets.add("汇总信息")
summary_sheet.range('A1').value = "汇总信息(关联表)"
summary_info = result['summary']
row = 2
for key, value in summary_info.items():
summary_sheet.range(f'A{row}').value = key
summary_sheet.range(f'B{row}').value = value
row += 1
# 4. 工艺信息表(多个半成品,关联产品主键)
if result['process_details'] is not None and not result['process_details'].empty:
process_sheet = wb.sheets.add("工艺信息")
process_sheet.range('A1').value = "工艺信息(关联表)"
process_sheet.range('A2').value = result['process_details']
# 5. 材料参数表
if result['material_parameters'] is not None and not result['material_parameters'].empty:
material_sheet = wb.sheets.add("材料参数")
material_sheet.range('A1').value = "材料参数(关联表)"
material_sheet.range('A2').value = result['material_parameters']
# 6. 关联关系表
relation_sheet = wb.sheets.add("关联关系")
relation_sheet.range('A1').value = "主键关联关系"
relationships = result['key_relationships']
row = 2
relation_sheet.range(f'A{row}').value = "产品主键"
relation_sheet.range(f'B{row}').value = relationships.get('product_key')
row += 2
relation_sheet.range(f'A{row}').value = "成本→工艺关联"
row += 1
for cost_id, process_id in relationships.get('cost_to_process', {}).items():
relation_sheet.range(f'A{row}').value = cost_id
relation_sheet.range(f'B{row}').value = process_id
row += 1
# 7. 批注信息表
if result['comments']:
comments_sheet = wb.sheets.add("批注信息")
comments_df = pd.DataFrame(result['comments'])
comments_sheet.range('A1').value = comments_df
wb.save(output_path)
print(f"完整数据已成功导出到: {output_path}")
except Exception as e:
print(f"导出数据时出错: {e}")
finally:
wb.close()
app.quit()
def save_to_database_enhanced(result, connection_string: str, table_prefix: str = ""):
"""将增强版数据保存到数据库"""
# 保存产品信息(主表)
product_info = result['product_info']
# 这里可以添加数据库保存逻辑
# 保存成本明细(关联表)
if result['cost_details'] is not None:
df_cost = result['cost_details']
# df_cost.to_sql(f"{table_prefix}cost_details", con=connection_string, if_exists='append', index=False)
print(f"成本明细数据已保存,记录数: {len(df_cost)}")
# 保存工艺信息(关联表)
if result['process_details'] is not None and not result['process_details'].empty:
df_process = result['process_details']
# df_process.to_sql(f"{table_prefix}process_details", con=connection_string, if_exists='append', index=False)
print(f"工艺信息数据已保存,记录数: {len(df_process)}")
# 保存材料参数
if result['material_parameters'] is not None and not result['material_parameters'].empty:
df_material = result['material_parameters']
# df_material.to_sql(f"{table_prefix}material_params", con=connection_string, if_exists='append', index=False)
print(f"材料参数数据已保存,记录数: {len(df_material)}")
# 保存关联关系
relationships = result['key_relationships']
# 这里可以保存关联关系到数据库
print("所有数据表已建立完整关联关系,可以用于数据库查询和关联分析")
# 使用示例
if __name__ == "__main__":
path = r"E:\韦瑞奎\pdf\整理成本表到数据库"
# 创建增强版处理器
processor = EnhancedCostAnalysisTemplate()
# 处理单个文件
file_path = os.path.join(path, "1884-288-38-8.xlsx")
result = processor.process_single_file(file_path)
# 打印处理结果
print(f"文件: {result['file_name']}")
print(f"产品主键: {result['product_info'].get('product_key')}")
print(f"成本明细记录数: {len(result['cost_details']) if result['cost_details'] is not None else 0}")
print(f"工艺信息记录数: {len(result['process_details']) if result['process_details'] is not None else 0}")
print(f"材料参数记录数: {len(result['material_parameters']) if result['material_parameters'] is not None else 0}")
print(f"关联关系数量: {len(result['key_relationships'].get('cost_to_process', {}))}")
# 导出完整数据
output_file = os.path.join(path, "完整导出数据.xlsx")
export_enhanced_to_excel(result, output_file)
# 模拟数据库保存
# save_to_database_enhanced(result, "your_connection_string")